package gecko.mqtt;

import gecko.*;
import gecko.lang.AwaitValue;
import gecko.lang.JSONObject;
import gecko.lang.TypedMap;
import gecko.x.Strings;
import org.eclipse.paho.client.mqttv3.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;

/**
 * MqttDeviceRouter为MqttVirtualDevice提供一个多设备管理环境，提供MQTT网络通讯环境。
 *
 * @author 陈永佳 (yoojiachen@gmail.com)
 * @version 0.0.1
 */
public class MqttDevicePipeline extends DevicePipeline implements VirtualDevice.FutureTransporter {

    private static final Logger LOGGER = LoggerFactory.getLogger(MqttDevicePipeline.class);

    private final MqttConnectOptions mOpts = new MqttConnectOptions();

    private final Map<String, AwaitValue<EventFrame>> mKVLazyEvents = new ConcurrentHashMap<>();

    private MqttClient mMqttClient;
    private String mDeviceRPCTopic;

    private final MqttCallback mMqttReplyMessagesCallback = new MqttCallback() {
        @Override
        public void connectionLost(Throwable cause) {
            LOGGER.error("MQTT与Broker连接丢失", cause);
        }

        @Override
        public void messageArrived(String topic, MqttMessage message) throws Exception {
            final MqttTopic mt;
            try {
                mt = MqttTopic.parse(topic);
            } catch (Exception ex) {
                LOGGER.warn("MQTT接收到未知支持的Replies事件Topic: {}", topic);
                return;
            }
            final String recvKey = makeEventKey(mt.groupAddress, mt.physicalAddress, mt.eventId);
            final AwaitValue<EventFrame> lazy = mKVLazyEvents.get(recvKey);
            if (lazy != null) {
                final EventFrame event = EventFrame.create(mt.eventId, message.getPayload());
                event.headers.put("mqtt.qos", String.valueOf(message.getQos()));
                event.headers.put("mqtt.id", String.valueOf(message.getId()));
                lazy.set(event);
            } else {
                LOGGER.warn("MQTT接收到未知的Replies事件Id： {}, Topic: {}", mt.eventId, topic);
            }
        }

        @Override
        public void deliveryComplete(IMqttDeliveryToken token) {
            // nop
        }
    };

    @Override
    public CompletableFuture<EventFrame> transport(EventFrame event, GeckoScoped scoped, Address address) {
        // 提供使用MQTT通讯网络，实现RPC消息通讯；
        return scoped.submitFuture(() -> {
            final String topic = createTopic(scoped, MqttTopic.EVENT_TYPE_REQS, event.id, address);
            final MqttMessage message = new MqttMessage(event.bytes);
            message.setQos(1);
            // 发送
            try {
                mMqttClient.publish(topic, message);
            } catch (MqttException me) {
                LOGGER.error("MQTT发送错误", me);
                return event.newOf(JSONObject.error("MQTT内部错误").toJSONBytes());
            }
            // 等待响应
            final String recvKey = makeEventKey(address.groupAddress(), address.physicalAddress(), event.id);
            final AwaitValue<EventFrame> value = new AwaitValue<>();
            mKVLazyEvents.put(recvKey, value);
            try {
                return value.getAwait();
            } catch (InterruptedException ie) {
                return event.newOf(JSONObject.error("MQTT_VALUE_INTERRUPTED").toJSONBytes());
            } finally {
                mKVLazyEvents.remove(recvKey);
            }
        });
    }

    @Override
    public void onInit(TypedMap initArgs, GeckoScoped scoped) {
        mOpts.setAutomaticReconnect(initArgs.getBoolean("autoReconnect", true));
        mOpts.setCleanSession(initArgs.getBoolean("autoCleanSession", false));
        mOpts.setKeepAliveInterval(initArgs.getInt("keepAliveIntervalSec", 5));
        final String serverAddress = initArgs.getString("broker");
        Strings.requireNotEmpty(serverAddress);

        final String clientId = scoped.getNodeId() + "@" + scoped.getDomain();
        // 监听设备之间的Req-Rep通讯Topic:
        // >  /gecko/nodeId@domain/replies/+/+/+
        mDeviceRPCTopic = createTopic(scoped, MqttTopic.EVENT_TYPE_REPS, "+", "+", "+");

        final String username = initArgs.getString("authUsername");
        final String password = initArgs.getString("authPassword");
        if (!Strings.isNullOrEmpty(username) && !Strings.isNullOrEmpty(password)) {
            mOpts.setPassword(password.toCharArray());
            mOpts.setUserName(username);
            LOGGER.info("MQTT Broker: {}, 客户端ID: {}, 验证帐户: {}@{}", serverAddress, clientId, username, password);
        } else {
            LOGGER.info("MQTT Broker: {}, 客户端ID: {}, 无验证信息", serverAddress, clientId);
        }

        try {
            mMqttClient = new MqttClient(serverAddress, clientId);
            mMqttClient.setCallback(mMqttReplyMessagesCallback);
            mMqttClient.setManualAcks(false);
        } catch (MqttException ex) {
            LOGGER.error("创建MQTT客户端发生错误", ex);
        }
    }

    @Override
    public void onStart(GeckoScoped scoped) {
        // 绑定Mqtt消息发送接口
        forEach(dev -> ((AbstractDevice) dev).setFutureTransporter(this));
        super.onStart(scoped);
        // 连接服务器
        try {
            mMqttClient.connect(mOpts);
            LOGGER.info("MQTT Broker订阅Topic: {}", mDeviceRPCTopic);
            mMqttClient.subscribe(mDeviceRPCTopic);
        } catch (MqttException ex) {
            LOGGER.error("MQTT客户端连接服务器发生错误", ex);
        }
    }

    @Override
    public void onStop(GeckoScoped scoped) {
        try {
            mMqttClient.unsubscribe(mDeviceRPCTopic);
            mMqttClient.disconnect();
        } catch (MqttException ex) {
            LOGGER.error("MQTT客户端断开服务器连接时发生错误", ex);
        } finally {
            super.onStop(scoped);
        }
    }

    @Override
    public String getSupportProtocol() {
        return MqttVirtualDevice.PROTOCOL;
    }


    private static String createTopic(GeckoScoped scoped, String eventType, long eventId, Address addr) {
        return createTopic(scoped, eventType, addr.groupAddress(), addr.physicalAddress(), eventId);
    }

    private static String createTopic(GeckoScoped scoped, String eventType, String group, String physical, Object eventId) {
        // / gecko / {nodeId} @ {domain} / {eventType} / {groupAddr} / {physicalAddr} / {eventId}
        final StringBuilder sb = new StringBuilder("/gecko");
        sb.append("/").append(scoped.getNodeId()).append("@").append(scoped.getDomain());
        sb.append("/").append(eventType);
        sb.append("/").append(group);
        sb.append("/").append(physical);
        sb.append("/").append(eventId);
        return sb.toString();
    }

    private static String makeEventKey(String group, String physical, long eventId) {
        return physical + "@" + group + "#" + eventId;
    }
}
